#  Copyright 2025 Collate
#  Licensed under the Collate Community License, Version 1.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

"""
OpenMetadata utils tests
"""
import base64
import json
from unittest import TestCase

from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint
from metadata.generated.schema.entity.data.dashboard import Dashboard
from metadata.generated.schema.entity.data.dashboardDataModel import DashboardDataModel
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.entity.data.mlmodel import MlModel
from metadata.generated.schema.entity.data.pipeline import Pipeline
from metadata.generated.schema.entity.data.searchIndex import SearchIndex
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.data.topic import Topic
from metadata.generated.schema.entity.services.dashboardService import DashboardService
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.messagingService import MessagingService
from metadata.generated.schema.entity.services.pipelineService import PipelineService
from metadata.generated.schema.entity.teams.user import User
from metadata.generated.schema.type import basic
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.connections.headers import render_query_header
from metadata.ingestion.models.topology import (
    get_entity_hierarchy,
    get_entity_hierarchy_depth,
)
from metadata.ingestion.ometa.utils import (
    build_entity_reference,
    decode_jwt_token,
    format_name,
    get_entity_type,
    model_str,
)
from metadata.utils.constants import ENTITY_REFERENCE_CLASS_MAP

MOCK_TABLE = Table(
    id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
    name="customers",
    description="description\nwith new line",
    tableType="Regular",
    columns=[
        Column(
            name="customer_id",
            dataType="INT",
        ),
        Column(
            name="first_name",
            dataType="STRING",
        ),
        Column(
            name="last_name",
            dataType="STRING",
        ),
    ],
    databaseSchema=EntityReference(
        id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", type="databaseSchema"
    ),
)


class OMetaUtilsTest(TestCase):
    def test_format_name(self):
        """
        Check we are properly formatting names
        """

        self.assertEqual(format_name("random"), "random")
        self.assertEqual(format_name("ran dom"), "ran_dom")
        self.assertEqual(format_name("ran_(dom"), "ran__dom")

    def test_get_entity_type(self):
        """
        Check that we return a string or the class name
        """

        self.assertEqual(get_entity_type("hello"), "hello")
        self.assertEqual(get_entity_type(MlModel), "mlmodel")

    def test_get_entity_type_camel_case_consistency(self):
        """
        Test that get_entity_type returns camelCase entity types
        that match ENTITY_REFERENCE_CLASS_MAP keys (Issue #20838)

        This test demonstrates the current failing behavior where
        get_entity_type returns lowercase versions instead of
        proper camelCase that matches the reference map.
        """

        # Test cases that should return camelCase to match ENTITY_REFERENCE_CLASS_MAP
        test_cases = [
            (APIEndpoint, "apiEndpoint"),
            (DashboardDataModel, "dashboardDataModel"),
            (DatabaseSchema, "databaseSchema"),
            (SearchIndex, "searchIndex"),
            (DatabaseService, "databaseService"),
            (User, "user"),  # This one works correctly
        ]

        for entity_class, expected_type in test_cases:
            with self.subTest(entity_class=entity_class.__name__):
                result = get_entity_type(entity_class)
                self.assertEqual(
                    result,
                    expected_type,
                    f"get_entity_type({entity_class.__name__}) returned '{result}' "
                    f"but should return '{expected_type}' to match ENTITY_REFERENCE_CLASS_MAP",
                )

                # Verify the expected type exists in ENTITY_REFERENCE_CLASS_MAP
                self.assertIn(
                    expected_type,
                    ENTITY_REFERENCE_CLASS_MAP,
                    f"Expected type '{expected_type}' should exist in ENTITY_REFERENCE_CLASS_MAP",
                )

    def test_model_str(self):
        """
        Return Uuid as str
        """

        self.assertEqual(model_str("random"), "random")
        self.assertEqual(
            model_str(basic.Uuid("9fc58e81-7412-4023-a298-59f2494aab9d")),
            "9fc58e81-7412-4023-a298-59f2494aab9d",
        )

        self.assertEqual(model_str(basic.EntityName("EntityName")), "EntityName")
        self.assertEqual(model_str(basic.FullyQualifiedEntityName("FQDN")), "FQDN")

    def test_render_query_headers_builds_the_right_string(self) -> None:
        assert (
            render_query_header("0.0.1")
            == '/* {"app": "OpenMetadata", "version": "0.0.1"} */'
        )

    def test_build_entity_reference(self) -> None:
        """Check we're building the right class"""
        res = build_entity_reference(MOCK_TABLE)
        self.assertEqual(res.type, "table")
        self.assertEqual(res.id, MOCK_TABLE.id)

    def test_decode_jwt_token_valid(self):
        """Test decoding a valid JWT token"""
        # Create a mock JWT payload
        payload = {
            "sub": "testuser",
            "email": "testuser@example.com",
            "name": "Test User",
            "iat": 1640995200,
            "exp": 1641081600,
        }

        # Encode the payload
        payload_encoded = (
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
            .decode("utf-8")
            .rstrip("=")
        )

        # Create a mock JWT token (header.payload.signature)
        jwt_token = f"eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.{payload_encoded}.signature"

        result = decode_jwt_token(jwt_token)

        self.assertIsNotNone(result)
        self.assertEqual(result["sub"], "testuser")
        self.assertEqual(result["email"], "testuser@example.com")
        self.assertEqual(result["name"], "Test User")

    def test_decode_jwt_token_with_padding(self):
        """Test decoding a JWT token that needs padding"""
        # Create a payload that will need padding
        payload = {"sub": "admin", "email": "admin@openmetadata.org"}

        # Encode without padding
        payload_encoded = (
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
            .decode("utf-8")
            .rstrip("=")
        )

        jwt_token = f"header.{payload_encoded}.signature"

        result = decode_jwt_token(jwt_token)

        self.assertIsNotNone(result)
        self.assertEqual(result["sub"], "admin")
        self.assertEqual(result["email"], "admin@openmetadata.org")

    def test_decode_jwt_token_invalid_format(self):
        """Test decoding an invalid JWT token format"""
        # Test with wrong number of parts
        invalid_token = "header.payload"  # Missing signature
        result = decode_jwt_token(invalid_token)
        self.assertIsNone(result)

        # Test with too many parts
        invalid_token = "header.payload.signature.extra"
        result = decode_jwt_token(invalid_token)
        self.assertIsNone(result)

    def test_decode_jwt_token_invalid_base64(self):
        """Test decoding a JWT token with invalid base64 in payload"""
        invalid_token = "header.invalid-base64.signature"
        result = decode_jwt_token(invalid_token)
        self.assertIsNone(result)

    def test_decode_jwt_token_invalid_json(self):
        """Test decoding a JWT token with invalid JSON in payload"""
        # Create invalid JSON payload
        invalid_json = "invalid json content"
        payload_encoded = base64.urlsafe_b64encode(invalid_json.encode("utf-8")).decode(
            "utf-8"
        )

        jwt_token = f"header.{payload_encoded}.signature"

        result = decode_jwt_token(jwt_token)
        self.assertIsNone(result)

    def test_decode_jwt_token_empty_payload(self):
        """Test decoding a JWT token with empty payload"""
        # Create empty payload
        payload_encoded = base64.urlsafe_b64encode(
            json.dumps({}).encode("utf-8")
        ).decode("utf-8")

        jwt_token = f"header.{payload_encoded}.signature"

        result = decode_jwt_token(jwt_token)
        self.assertIsNotNone(result)
        self.assertEqual(result, {})

    def test_decode_jwt_token_none_input(self):
        """Test decoding with None input"""
        result = decode_jwt_token(None)
        self.assertIsNone(result)

    def test_decode_jwt_token_empty_string(self):
        """Test decoding with empty string input"""
        result = decode_jwt_token("")
        self.assertIsNone(result)

    def test_decode_jwt_token_real_world_example(self):
        """Test with a realistic JWT token structure"""
        # Simulate a real OpenMetadata JWT token payload
        payload = {
            "sub": "ingestion-bot",
            "iss": "open-metadata.org",
            "iat": 1663938462,
            "email": "ingestion-bot@open-metadata.org",
            "isBot": False,
        }

        payload_encoded = (
            base64.urlsafe_b64encode(json.dumps(payload).encode("utf-8"))
            .decode("utf-8")
            .rstrip("=")
        )

        jwt_token = f"eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.{payload_encoded}.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

        result = decode_jwt_token(jwt_token)

        self.assertIsNotNone(result)
        self.assertEqual(result["sub"], "ingestion-bot")
        self.assertEqual(result["iss"], "open-metadata.org")
        self.assertEqual(result["email"], "ingestion-bot@open-metadata.org")
        self.assertEqual(result["isBot"], False)

    def test_get_entity_hierarchy_returns_dict(self):
        """Test that get_entity_hierarchy returns a dictionary"""
        hierarchy = get_entity_hierarchy()

        self.assertIsInstance(hierarchy, dict)
        self.assertGreater(len(hierarchy), 0, "Hierarchy should not be empty")

    def test_get_entity_hierarchy_caching(self):
        """Test that get_entity_hierarchy uses caching"""
        first_call = get_entity_hierarchy()
        second_call = get_entity_hierarchy()

        self.assertIs(
            first_call,
            second_call,
            "Should return the same cached instance on subsequent calls",
        )

    def test_get_entity_hierarchy_database_service_order(self):
        """Test that database service entities are in correct hierarchical order"""
        db_service_depth = get_entity_hierarchy_depth(DatabaseService)
        database_depth = get_entity_hierarchy_depth(Database)
        schema_depth = get_entity_hierarchy_depth(DatabaseSchema)
        table_depth = get_entity_hierarchy_depth(Table)

        self.assertLess(
            db_service_depth,
            database_depth,
            "DatabaseService should come before Database in hierarchy",
        )
        self.assertLess(
            database_depth,
            schema_depth,
            "Database should come before DatabaseSchema in hierarchy",
        )
        self.assertLess(
            schema_depth,
            table_depth,
            "DatabaseSchema should come before Table in hierarchy",
        )

    def test_get_entity_hierarchy_messaging_service_order(self):
        """Test that messaging service entities are in correct hierarchical order"""
        messaging_service_depth = get_entity_hierarchy_depth(MessagingService)
        topic_depth = get_entity_hierarchy_depth(Topic)

        self.assertLess(
            messaging_service_depth,
            topic_depth,
            "MessagingService should come before Topic in hierarchy",
        )

    def test_get_entity_hierarchy_dashboard_service_order(self):
        """Test that dashboard service entities are in correct hierarchical order"""
        dashboard_service_depth = get_entity_hierarchy_depth(DashboardService)
        dashboard_depth = get_entity_hierarchy_depth(Dashboard)

        self.assertLess(
            dashboard_service_depth,
            dashboard_depth,
            "DashboardService should come before Dashboard in hierarchy",
        )

    def test_get_entity_hierarchy_pipeline_service_order(self):
        """Test that pipeline service entities are in correct hierarchical order"""
        pipeline_service_depth = get_entity_hierarchy_depth(PipelineService)
        pipeline_depth = get_entity_hierarchy_depth(Pipeline)

        self.assertLess(
            pipeline_service_depth,
            pipeline_depth,
            "PipelineService should come before Pipeline in hierarchy",
        )

    def test_get_entity_hierarchy_depth_unknown_entity(self):
        """Test that unknown entity types return a default depth"""

        class UnknownEntity:
            pass

        depth = get_entity_hierarchy_depth(UnknownEntity)
        self.assertEqual(depth, 999, "Unknown entities should return depth 999")

    def test_get_entity_hierarchy_depth_all_entities_have_valid_depth(self):
        """Test that all entities in the hierarchy have valid (non-negative) depths"""
        hierarchy = get_entity_hierarchy()

        for entity_type, depth in hierarchy.items():
            with self.subTest(entity_type=entity_type.__name__):
                self.assertIsInstance(
                    depth, int, f"{entity_type.__name__} depth should be an integer"
                )
                self.assertGreaterEqual(
                    depth, 0, f"{entity_type.__name__} depth should be non-negative"
                )

    def test_get_entity_hierarchy_services_at_root(self):
        """Test that all service types are at depth 0 (root level)"""
        service_types = [
            DatabaseService,
            DashboardService,
            MessagingService,
            PipelineService,
        ]

        for service_type in service_types:
            with self.subTest(service_type=service_type.__name__):
                depth = get_entity_hierarchy_depth(service_type)
                self.assertEqual(
                    depth,
                    0,
                    f"{service_type.__name__} should be at root level (depth 0)",
                )
